Flows Above Normal

Post-crisis analysis

In this worked example we assume the role of an analyst working in the aftermath of a significant crisis in Nepal. Our aim is to use FlowKit to investigate which administrative regions people have been displaced from/to during the crisis, following the methodology used in this paper.

The Jupyter notebook for this worked example can be downloaded here, or can be run using the quick start setup.

Load FlowClient and connect to FlowAPI

We start by importing FlowClient. We also import geopandas and mapboxgl, which we will use later to to visualise the data.

import flowclient
import os
import numpy as np
import geopandas as gpd
import mapboxgl
from mapboxgl.utils import create_color_stops

We must next generate a FlowAPI access token using FlowAuth. If you are running this notebook using the quick start setup, generating a token requires the following steps:

  1. Visit the FlowAuth login page at http://localhost:9091.
  2. Log in with username TEST_USER and password DUMMY_PASSWORD.
  3. Under "My Servers", select TEST_SERVER.
  4. Click the + button to create a new token.
  5. Give the new token a name, and click SAVE.
  6. Copy the token string using the COPY button.
  7. Paste the token in this notebook as TOKEN.

The steps are the same in a production setup, but the FlowAuth URL, login details and server name will differ.

Once we have a token, we can start a connection to the FlowAPI system.

conn = flowclient.connect(
    url=os.getenv("FLOWAPI_URL", "http://localhost:9090"), token=TOKEN
)

Calculate origin-destination matrices

To estimate abnormal population movements, we will calculate two origin-destination matrices (or flows): a "normal" flow before the crisis occurs, and a "crisis" flow comparing subscriber locations before and during the crisis.

Calculating these two flows requires three reference home locations. In this example we will use modal locations for three periods:

  • A "benchmark" period before the crisis begins,
  • A "comparison" period shortly before the crisis,
  • A "focal" period immediately after the crisis begins.

Here we assume that the crisis begins on 10th February 2016.

We first call the modal_location_from_dates function three times to create Python dictionaries containing the parameters for the three modal location queries.

home_locations_specs = {
    "benchmark": flowclient.modal_location_from_dates(
        start_date="2016-01-01",
        end_date="2016-01-21",
        method="last",
        aggregation_unit="admin3",
    ),
    "comparison": flowclient.modal_location_from_dates(
        start_date="2016-01-21",
        end_date="2016-02-10",
        method="last",
        aggregation_unit="admin3",
    ),
    "focal": flowclient.modal_location_from_dates(
        start_date="2016-02-10",
        end_date="2016-02-28",
        method="last",
        aggregation_unit="admin3",
    ),
}

Modal location queries return subscriber-level results, which cannot be accessed directly through FlowAPI. We can create spatially-aggregated query specifications by passing each modal location query specification to the spatial_aggregate function, and pass these query specifications to the run_query function to start running them. This function will return a query ID for each spatially-aggregated modal location query.

home_locations_ids = {
    period: flowclient.run_query(
        connection=conn,
        query=flowclient.spatial_aggregate(locations=query_spec),
    )
    for period, query_spec in home_locations_specs.items()
}
home_locations_ids

{'benchmark': 'd972918fb81af0cc0e9b45a4e13d34f8',
 'comparison': '802a2e651138c9861ac94ef172e28ca8',
 'focal': '82e048272804d21aa83c458b4feb0b5a'}

Next, we pass the modal location dictionaries as parameters to the flows function, to create specifications for the two flows queries, and set the resulting queries running as before.

flows_specs = {
    "normal": flowclient.flows(
        from_location=home_locations_specs["benchmark"],
        to_location=home_locations_specs["comparison"],
        aggregation_unit="admin3",
    ),
    "crisis": flowclient.flows(
        from_location=home_locations_specs["benchmark"],
        to_location=home_locations_specs["focal"],
        aggregation_unit="admin3",
    ),
}
flows_ids = {
    flow: flowclient.run_query(connection=conn, query=query_spec)
    for flow, query_spec in flows_specs.items()
}
flows_ids

{'crisis': '5b0e2aefbd731c13e9db3368207c99bd',
 'normal': '293d75ae3e50429bd12bb2b420457e34'}

We can periodically check the status of the queries using the get_status function.

flowclient.get_status(connection=conn, query_id=flows_ids["crisis"])

queued

Visualise the distributions of locations

While the flows are calculating, we download the geography for the level 3 administrative regions as GeoJSON using the get_geography function.

# Download geography data as GeoJSON
regions = flowclient.get_geography(connection=conn, aggregation_unit="admin3")

# Create a geopandas GeoDataFrame from the GeoJSON
regions_geodataframe = gpd.GeoDataFrame.from_features(regions)

We can call get_result to get the results of the modal location queries as pandas DataFrames.

home_locations_results = {
    period: flowclient.get_result(
        connection=conn,
        query=flowclient.spatial_aggregate(locations=query_spec),
    )
    for period, query_spec in home_locations_specs.items()
}

We combine these results with the geography data, and use the Mapbox GL library to create a choropleth showing the distribution of modal locations.

Note: Mapbox requires an access token, which should be set as the environment variable MAPBOX_ACCESS_TOKEN. Note that this is only required for producing the Mapbox visualisations, which is completely separate from FlowKit.

home_locations_geodataframe = regions_geodataframe.drop(columns="centroid")

for period in home_locations_specs.keys():
    home_locations_geodataframe = home_locations_geodataframe.join(
        home_locations_results[period]
        .set_index("pcod")
        .rename(columns={"total": f"Total ({period} period)"}),
        on="pcod",
        how="left",
    ).fillna(value={f"Total ({period} period)":0})

home_locations_geodataframe = home_locations_geodataframe.rename(
    columns={"pcod": "P-code"}
)
period_to_show = "benchmark"  # "comparison" "focal"

mapbox_token = os.environ["MAPBOX_ACCESS_TOKEN"]

# Colour scale for legend
max_total = max(
    [
        home_locations_geodataframe[f"Total ({period} period)"].max()
        for period in home_locations_specs.keys()
    ]
)
color_stops = create_color_stops(np.linspace(0, max_total, 9), colors="YlGn")

modal_locations_viz = mapboxgl.ChoroplethViz(
    home_locations_geodataframe.__geo_interface__,
    access_token=mapbox_token,
    color_property=f"Total ({period_to_show} period)",
    color_stops=color_stops,
    opacity=0.8,
    line_color="black",
    line_width=0.5,
    legend_gradient=True,
    legend_layout="horizontal",
    legend_text_numeric_precision=0,
    below_layer="waterway-label",
    center=(84.1, 28.4),
    zoom=5.5,
)

modal_locations_viz.show()

Calculate flows above normal

Once the flows queries have finished running, we can obtain the results for the flows. We can either use the get_result function, as we did above for the modal locations, or we can call get_result_by_query_id and pass the query id for the flows query.

flows_results = {
    flow: flowclient.get_result_by_query_id(connection=conn, query_id=query_id)
    for flow, query_id in flows_ids.items()
}

We subtract the "normal" flow from the "crisis" flow to find the flows above normal during the crisis period.

flows_above_normal = (
    flows_results["crisis"]
    .set_index(["pcod_from", "pcod_to"])
    .subtract(
        flows_results["normal"].set_index(["pcod_from", "pcod_to"]),
        fill_value=0,
    )
    .reset_index()
)

We can now aggregate the flows above normal to the "pcod_to" or "pcod_from" regions, excluding flows where the origin and destination regions are the same, to get the inflows/outflows above normal, respectively.

inflows_above_normal = (
    flows_above_normal[
        flows_above_normal["pcod_from"] != flows_above_normal["pcod_to"]
    ]
    .groupby("pcod_to")
    .sum()
)

outflows_above_normal = (
    flows_above_normal[
        flows_above_normal["pcod_from"] != flows_above_normal["pcod_to"]
    ]
    .groupby("pcod_from")
    .sum()
)

As with the modal locations, we can combine these results with the geography data to display the data on choropleth maps.

in_out_flows_geodataframe = (
    regions_geodataframe.set_index("pcod")
    .join(
        [
            inflows_above_normal.rename(
                columns={"count": "inflow above normal"}
            ),
            outflows_above_normal.rename(
                columns={"count": "outflow above normal"}
            ),
        ],
        how="left",
    )
    .fillna(value={"inflow above normal": 0, "outflow above normal":0})
    .reset_index()
)

in_out_flows_geodataframe = in_out_flows_geodataframe.drop(
    columns="centroid"
).rename(columns={"pcod": "P-code"})
direction_to_show = "in"  # "out"

mapbox_token = os.environ["MAPBOX_ACCESS_TOKEN"]

# Colour scale for legend
max_count = max(
    [
        in_out_flows_geodataframe[f"{direction}flow above normal"].abs().max()
        for direction in ["in", "out"]
    ]
)
color_stops = create_color_stops(
    np.linspace(-max_count, max_count, 11), colors="PiYG"
)

flows_viz = mapboxgl.ChoroplethViz(
    in_out_flows_geodataframe.__geo_interface__,
    access_token=mapbox_token,
    color_property=f"{direction_to_show}flow above normal",
    color_stops=color_stops,
    opacity=0.8,
    line_color="black",
    line_width=0.5,
    legend_gradient=True,
    legend_layout="horizontal",
    legend_text_numeric_precision=0,
    below_layer="waterway-label",
    center=(84.1, 28.4),
    zoom=5.5,
)

flows_viz.show()